/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.test.checkpointing;

import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.test.checkpointing.utils.FailingSource;
import org.apache.flink.test.checkpointing.utils.IntType;
import org.apache.flink.test.checkpointing.utils.ValidatingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;

import org.junit.ClassRule;
import org.junit.Test;

import java.time.Duration;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
 * This verifies that checkpointing works correctly with event time windows.
 *
 * <p>This is a version of {@link EventTimeWindowCheckpointingITCase} for All-Windows.
 */
@SuppressWarnings("serial")
public class EventTimeAllWindowCheckpointingITCase extends TestLogger {

    private static final int PARALLELISM = 4;

    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
            new MiniClusterWithClientResource(
                    new MiniClusterResourceConfiguration.Builder()
                            .setConfiguration(getConfiguration())
                            .setNumberTaskManagers(2)
                            .setNumberSlotsPerTaskManager(PARALLELISM / 2)
                            .build());

    private static Configuration getConfiguration() {
        Configuration config = new Configuration();
        config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("48m"));
        config.set(RpcOptions.LOOKUP_TIMEOUT_DURATION, Duration.ofMinutes(1));
        config.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1));
        return config;
    }

    // ------------------------------------------------------------------------

    @Test
    public void testTumblingTimeWindow() throws Exception {
        final int numElementsPerKey = 3000;
        final int windowSize = 100;
        final int numKeys = 1;

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(PARALLELISM);
        env.enableCheckpointing(100);
        RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);

        env.addSource(
                        new FailingSource(
                                new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(
                                        numKeys, windowSize),
                                numElementsPerKey))
                .rebalance()
                .windowAll(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize)))
                .apply(
                        new RichAllWindowFunction<
                                Tuple2<Long, IntType>,
                                Tuple4<Long, Long, Long, IntType>,
                                TimeWindow>() {

                            private boolean open = false;

                            @Override
                            public void open(OpenContext openContext) {
                                assertEquals(
                                        1,
                                        getRuntimeContext()
                                                .getTaskInfo()
                                                .getNumberOfParallelSubtasks());
                                open = true;
                            }

                            @Override
                            public void apply(
                                    TimeWindow window,
                                    Iterable<Tuple2<Long, IntType>> values,
                                    Collector<Tuple4<Long, Long, Long, IntType>> out) {

                                // validate that the function has been opened properly
                                assertTrue(open);

                                int sum = 0;
                                long key = -1;

                                for (Tuple2<Long, IntType> value : values) {
                                    sum += value.f1.value;
                                    key = value.f0;
                                }
                                out.collect(
                                        new Tuple4<>(
                                                key,
                                                window.getStart(),
                                                window.getEnd(),
                                                new IntType(sum)));
                            }
                        })
                .addSink(
                        new ValidatingSink<>(
                                new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(
                                        numElementsPerKey),
                                new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(
                                        numKeys, numElementsPerKey, windowSize)))
                .setParallelism(1);

        env.execute("Tumbling Window Test");
    }

    @Test
    public void testSlidingTimeWindow() throws Exception {
        final int numElementsPerKey = 3000;
        final int windowSize = 1000;
        final int windowSlide = 100;
        final int numKeys = 1;

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(PARALLELISM);
        env.enableCheckpointing(100);
        RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);

        env.addSource(
                        new FailingSource(
                                new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(
                                        numKeys, windowSlide),
                                numElementsPerKey))
                .rebalance()
                .windowAll(
                        SlidingEventTimeWindows.of(
                                Duration.ofMillis(windowSize), Duration.ofMillis(windowSlide)))
                .apply(
                        new RichAllWindowFunction<
                                Tuple2<Long, IntType>,
                                Tuple4<Long, Long, Long, IntType>,
                                TimeWindow>() {

                            private boolean open = false;

                            @Override
                            public void open(OpenContext openContext) {
                                assertEquals(
                                        1,
                                        getRuntimeContext()
                                                .getTaskInfo()
                                                .getNumberOfParallelSubtasks());
                                open = true;
                            }

                            @Override
                            public void apply(
                                    TimeWindow window,
                                    Iterable<Tuple2<Long, IntType>> values,
                                    Collector<Tuple4<Long, Long, Long, IntType>> out) {

                                // validate that the function has been opened properly
                                assertTrue(open);

                                int sum = 0;
                                long key = -1;

                                for (Tuple2<Long, IntType> value : values) {
                                    sum += value.f1.value;
                                    key = value.f0;
                                }
                                out.collect(
                                        new Tuple4<>(
                                                key,
                                                window.getStart(),
                                                window.getEnd(),
                                                new IntType(sum)));
                            }
                        })
                .addSink(
                        new ValidatingSink<>(
                                new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(
                                        numElementsPerKey),
                                new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(
                                        numKeys, numElementsPerKey, windowSlide)))
                .setParallelism(1);

        env.execute("Sliding Window Test");
    }

    @Test
    public void testPreAggregatedTumblingTimeWindow() throws Exception {
        final int numElementsPerKey = 3000;
        final int windowSize = 100;
        final int numKeys = 1;

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(PARALLELISM);
        env.enableCheckpointing(100);
        RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);

        env.addSource(
                        new FailingSource(
                                new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(
                                        numKeys, windowSize),
                                numElementsPerKey))
                .rebalance()
                .windowAll(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize)))
                .reduce(
                        new ReduceFunction<Tuple2<Long, IntType>>() {

                            @Override
                            public Tuple2<Long, IntType> reduce(
                                    Tuple2<Long, IntType> a, Tuple2<Long, IntType> b) {

                                return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
                            }
                        },
                        new RichAllWindowFunction<
                                Tuple2<Long, IntType>,
                                Tuple4<Long, Long, Long, IntType>,
                                TimeWindow>() {

                            private boolean open = false;

                            @Override
                            public void open(OpenContext openContext) {
                                assertEquals(
                                        1,
                                        getRuntimeContext()
                                                .getTaskInfo()
                                                .getNumberOfParallelSubtasks());
                                open = true;
                            }

                            @Override
                            public void apply(
                                    TimeWindow window,
                                    Iterable<Tuple2<Long, IntType>> input,
                                    Collector<Tuple4<Long, Long, Long, IntType>> out) {

                                // validate that the function has been opened properly
                                assertTrue(open);

                                for (Tuple2<Long, IntType> in : input) {
                                    out.collect(
                                            new Tuple4<>(
                                                    in.f0,
                                                    window.getStart(),
                                                    window.getEnd(),
                                                    in.f1));
                                }
                            }
                        })
                .addSink(
                        new ValidatingSink<>(
                                new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(
                                        numElementsPerKey),
                                new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(
                                        numKeys, numElementsPerKey, windowSize)))
                .setParallelism(1);

        env.execute("PreAggregated Tumbling Window Test");
    }

    @Test
    public void testPreAggregatedSlidingTimeWindow() throws Exception {
        final int numElementsPerKey = 3000;
        final int windowSize = 1000;
        final int windowSlide = 100;
        final int numKeys = 1;

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(PARALLELISM);
        env.enableCheckpointing(100);
        RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);

        env.addSource(
                        new FailingSource(
                                new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(
                                        numKeys, windowSlide),
                                numElementsPerKey))
                .rebalance()
                .windowAll(
                        SlidingEventTimeWindows.of(
                                Duration.ofMillis(windowSize), Duration.ofMillis(windowSlide)))
                .reduce(
                        new ReduceFunction<Tuple2<Long, IntType>>() {

                            @Override
                            public Tuple2<Long, IntType> reduce(
                                    Tuple2<Long, IntType> a, Tuple2<Long, IntType> b) {

                                return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
                            }
                        },
                        new RichAllWindowFunction<
                                Tuple2<Long, IntType>,
                                Tuple4<Long, Long, Long, IntType>,
                                TimeWindow>() {

                            private boolean open = false;

                            @Override
                            public void open(OpenContext openContext) {
                                assertEquals(
                                        1,
                                        getRuntimeContext()
                                                .getTaskInfo()
                                                .getNumberOfParallelSubtasks());
                                open = true;
                            }

                            @Override
                            public void apply(
                                    TimeWindow window,
                                    Iterable<Tuple2<Long, IntType>> input,
                                    Collector<Tuple4<Long, Long, Long, IntType>> out) {

                                // validate that the function has been opened properly
                                assertTrue(open);

                                for (Tuple2<Long, IntType> in : input) {
                                    out.collect(
                                            new Tuple4<>(
                                                    in.f0,
                                                    window.getStart(),
                                                    window.getEnd(),
                                                    in.f1));
                                }
                            }
                        })
                .addSink(
                        new ValidatingSink<>(
                                new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(
                                        numElementsPerKey),
                                new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(
                                        numKeys, numElementsPerKey, windowSlide)))
                .setParallelism(1);

        env.execute("PreAggregated Sliding Window Test");
    }
}
